{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from collections import deque\n",
    "from functools import partial\n",
    "from itertools import cycle\n",
    "\n",
    "import pandas as pd\n",
    "\n",
    "import datashader as ds\n",
    "import datashader.transfer_functions as tf\n",
    "from datashader.colors import viridis\n",
    "\n",
    "from streamz import Stream"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def taxi_trips_stream(source='data/nyc_taxi.csv', frequency='T'):\n",
    "    \"\"\"Generate dataframes grouped by given frequency\"\"\"\n",
    "    def get_group(resampler, key):\n",
    "        try:\n",
    "            df = resampler.get_group(key)\n",
    "            df.reset_index(drop=True)\n",
    "        except KeyError:\n",
    "            df = pd.DataFrame()\n",
    "        return df\n",
    "\n",
    "    df = pd.read_csv(source,\n",
    "                     infer_datetime_format=True,\n",
    "                     parse_dates=['tpep_pickup_datetime', 'tpep_pickup_datetime'])\n",
    "    df = df.set_index('tpep_pickup_datetime', drop=True)\n",
    "    df = df.sort_index()\n",
    "    r = df.resample(frequency)\n",
    "    chunks = [get_group(r, g) for g in sorted(r.groups)]\n",
    "    indices = cycle(range(len(chunks)))\n",
    "    while True:\n",
    "        yield chunks[next(indices)]"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Create streams"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Given a stream of dataframes representing NYC taxi data, we create four streams: two streams are sliding window aggregations over some time period, while two other streams track the cumulative average for a particular value. The pipeline visualization below shows each step that makes up each stream.\n",
    "\n",
    "For each aggregation stream, the steps are 1) aggregate each dataframe using a Datashader reduction, 2) keep sliding window of those aggregations, and 3) combine sliding window collection into image. The first stream creates a two-day sliding window aggregation, while the second stream creates a 1-week sliding window aggregation.\n",
    "\n",
    "For each cumulative average stream, we track the cumulative sum of each value along with the number of cumulative data points.\n",
    "\n",
    "We use the primitives given in the `streamz` library to accomplish this. `aggregated_sliding_window_image_queue` creates each aggregation stream. `cumulative_mean_queue` creates each cumulative average stream, but this will likely be replaced by a native `streamz.StreamingDataFrame` container when ready. Each stream will place its final result into a double-ended queue, which is used to keep a history of previous results. By default, we only keep the most recent."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def aggregate_df(df, cvs, x, y, agg=None):\n",
    "    return df.index.min(), df.index.max(), cvs.points(df, x, y, agg)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def aggregate_images(iterable, cmap):\n",
    "    name = \"{:.10} - {:.10}\".format(str(iterable[0][0]), str(iterable[-1][1]))\n",
    "    total = sum([item[2] for item in iterable])\n",
    "    return tf.shade(total, cmap=cmap, name=name)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def aggregated_sliding_window_image_queue(source, agg1, agg2, window=1, history=1):\n",
    "    q = deque(maxlen=history)\n",
    "    s = source.map(agg1).sliding_window(window)\n",
    "    s.map(agg2).sink(q.append)\n",
    "    return q"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def cumulative_mean_queue(source, column, history=1):\n",
    "    def accumulator(acc, df):\n",
    "        n, total, oldest = acc\n",
    "        if not oldest:\n",
    "            oldest = df.index.min()\n",
    "        return n + 1, total + df[column].sum(), oldest, df.index.max()\n",
    "    \n",
    "    def merge(value):\n",
    "        n, total, oldest, latest = value\n",
    "        return oldest, latest, total / n\n",
    "\n",
    "    q = deque(maxlen=history)\n",
    "    source.accumulate(accumulator, start=(0, 0, None)).map(merge).sink(q.append)\n",
    "    return q"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def show_queue(q, column):\n",
    "    pd.options.display.float_format = '{:.2f}'.format\n",
    "    return pd.DataFrame(list(q), columns=['start', 'end', column])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "x_range = (-8243204.0, -8226511.0)\n",
    "y_range = (4968192.0, 4982886.0)\n",
    "cvs = ds.Canvas(plot_width=800, plot_height=600, x_range=x_range, y_range=y_range)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Helper functions for useful aggregations\n",
    "min_amount     = partial(aggregate_df, cvs, x='pickup_x', y='pickup_y', agg=ds.min('total_amount'))\n",
    "max_amount     = partial(aggregate_df, cvs, x='pickup_x', y='pickup_y', agg=ds.max('total_amount'))\n",
    "mean_amount    = partial(aggregate_df, cvs, x='pickup_x', y='pickup_y', agg=ds.mean('total_amount'))\n",
    "sum_amount     = partial(aggregate_df, cvs, x='pickup_x', y='pickup_y', agg=ds.sum('total_amount'))\n",
    "max_passengers = partial(aggregate_df, cvs, x='pickup_x', y='pickup_y', agg=ds.max('passenger_count'))\n",
    "sum_passengers = partial(aggregate_df, cvs, x='pickup_x', y='pickup_y', agg=ds.sum('passenger_count'))\n",
    "sum_pickups    = partial(aggregate_df, cvs, x='pickup_x', y='pickup_y', agg=ds.count())\n",
    "\n",
    "reduce_viridis = partial(aggregate_images, cmap=viridis)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "source = Stream()\n",
    "q_days = aggregated_sliding_window_image_queue(source, window=2, history=6, agg1=max_amount, agg2=reduce_viridis)\n",
    "q_week = aggregated_sliding_window_image_queue(source, window=7, agg1=max_amount, agg2=reduce_viridis)\n",
    "\n",
    "q_avg_passengers = cumulative_mean_queue(source, 'passenger_count', history=7)\n",
    "q_avg_amount     = cumulative_mean_queue(source, 'total_amount', history=7)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "source.visualize()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Simplifying stream creation"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "As you can see in the previous section, there are a few areas to improve upon:\n",
    "\n",
    "- less code/boilerplate\n",
    "- hide individual steps seen in stream diagram\n",
    "- encapsulate separate stream construction methods into helper classes\n",
    "- separate stream creation and stream sink\n",
    "- allow for partial results from sliding windows (not currently supported by `streamz`)\n",
    "- output results into other collections besides queues\n",
    "\n",
    "By subclassing `streamz.Stream`, we've accomplished the above without sacrificing readability."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class SlidingWindowImageAggregate(Stream):\n",
    "    def __init__(self, source, canvas, x, y, agg, n=7, cmap=None, bgcolor='black'):\n",
    "        # Set internal streamz instance variables to control names in diagram\n",
    "        self.n = n\n",
    "        \n",
    "        def aggregate_df(df):\n",
    "            return df.index.min(), df.index.max(), canvas.points(df, x, y, agg)\n",
    "\n",
    "        def aggregate_images(iterable):\n",
    "            name = \"{:.10} - {:.10}\".format(str(iterable[0][0]), str(iterable[-1][1]))\n",
    "            total = sum([item[2] for item in iterable])\n",
    "            return tf.set_background(tf.shade(total, cmap, name=name), color=bgcolor)\n",
    "        \n",
    "        self.cache = deque(maxlen=n)\n",
    "        self.agg1 = aggregate_df\n",
    "        self.agg2 = aggregate_images\n",
    "        \n",
    "        Stream.__init__(self, source)\n",
    "        \n",
    "    def update(self, x, who=None):\n",
    "        self.cache.append(self.agg1(x))\n",
    "        return self.emit(self.agg2(tuple(self.cache)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "class CumulativeMean(Stream):\n",
    "    def __init__(self, source, column):\n",
    "        # Set internal streamz instance variables to control names in diagram\n",
    "        self.str_list = ['column']\n",
    "        self.column = column\n",
    "\n",
    "        self.count = 0\n",
    "        self.total = 0\n",
    "        self.oldest = None\n",
    "\n",
    "        Stream.__init__(self, source)\n",
    "\n",
    "    def update(self, x, who=None):\n",
    "        if not self.oldest:\n",
    "            self.oldest = x.index.min()\n",
    "        self.count, self.total = self.count + 1, self.total + x[self.column].sum()\n",
    "        return self.emit((self.oldest, x.index.max(), self.total / self.count))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "source = Stream()\n",
    "\n",
    "cvs = ds.Canvas(plot_width=800, plot_height=600, x_range=x_range, y_range=y_range)\n",
    "\n",
    "q_days = deque(maxlen=6)\n",
    "s_days = SlidingWindowImageAggregate(source, cvs, 'pickup_x', 'pickup_y', ds.max('total_amount'), n=2, cmap=viridis)\n",
    "s_days.sink(q_days.append)\n",
    "\n",
    "q_week = deque(maxlen=1)\n",
    "s_week = SlidingWindowImageAggregate(source, cvs, 'pickup_x', 'pickup_y', ds.max('total_amount'), n=7, cmap=viridis)\n",
    "s_week.sink(q_week.append)\n",
    "\n",
    "q_avg_passengers = deque(maxlen=7)\n",
    "s_avg_passengers = CumulativeMean(source, 'passenger_count')\n",
    "s_avg_passengers.sink(q_avg_passengers.append)\n",
    "\n",
    "q_avg_amount = deque(maxlen=7)\n",
    "s_avg_amount = CumulativeMean(source, 'total_amount')\n",
    "s_avg_amount.sink(q_avg_amount.append)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "source.visualize()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Push data through streams"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We initially push 3 days worth of dataframes through the streams to view partial results."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "trips_per_day = taxi_trips_stream(frequency='D')\n",
    "for i in range(3):\n",
    "    source.emit(next(trips_per_day))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.Images(*list(q_week))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "for i in range(4):\n",
    "    source.emit(next(trips_per_day))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.Images(*list(q_week))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Cumulative average of passengers (ordered by oldest first)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "show_queue(q_avg_passengers, 'cumulative average passengers')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Cumulative average of total fare (ordered by oldest first)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "show_queue(q_avg_amount, 'cumulative average total fare')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### History of 2-day aggregations (ordered by oldest first)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.Images(*list(q_days))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Current 1-week aggregation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.Images(*list(q_week))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now we get the next day's worth of data and see how the streams have updated."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "source.emit(next(trips_per_day))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Cumulative average of passengers (ordered by oldest first)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "show_queue(q_avg_passengers, 'cumulative average passengers')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Cumulative average of total fare (ordered by oldest first)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "show_queue(q_avg_amount, 'cumulative average total fare')"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### History of 2-day aggregations (ordered by oldest first)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.Images(*list(q_days))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Current 1-week aggregation"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tf.Images(*list(q_week))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python",
   "pygments_lexer": "ipython3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
